I was able to read from both channels of the Xilinx DMA engine from the FPGA by running to midas frontends concurrently. You could also imagine reading within one frontend with a thread for each read.
The result was nearly doubling the total read rate from ~600 MB/s to 1GB/s.
frontend.cpp
#include <stdio.h> #include <stdlib.h> #include <math.h> #include <string.h> #include <iostream> #include <fstream> #include <sstream> #include <vector> #include <chrono> #include <thread> #include <atomic> #include <mutex> #include "midas.h" #include "odbxx.h" #include "mfe.h" #include "xdma_device_read.h" #include "xdma_device_write.h" // Define your PCIe devices as pointers to allow dynamic initialization XDMADeviceRead* deviceRead = nullptr; XDMADeviceWrite* deviceWrite = nullptr; // Timing flag #define ENABLE_TIMING 1 // Globals const char *frontend_name = "DataSimulator"; const char *frontend_file_name = __FILE__; BOOL frontend_call_loop = FALSE; INT display_period = 0; INT max_event_size = 128 * 1024 * 1024; INT max_event_size_frag = 5 * max_event_size; INT event_buffer_size = 5 * max_event_size; INT frontend_index; // frontend index from command line argument -i char settings_path[100]; // Define a vector to store 16-bit words std::vector<int16_t> data; size_t write_size = 1; size_t read_size = 1; // Global variable to keep track of the last poll time std::chrono::steady_clock::time_point last_poll_time; std::chrono::microseconds polling_interval(1000 * 1000); std::chrono::microseconds write_sleep_interval(1000 * 1000); // Global atomic flags and mutex std::atomic<bool> write_thread_active(false); // Initialized to false std::atomic<bool> new_data_available(false); std::mutex settings_mutex; bool read_only = false; // Global variable for Read Only flag // Verbosity flag bool verbose = false; // Function to start timing, returns the start time std::chrono::steady_clock::time_point start_timing() { return std::chrono::steady_clock::now(); } // Function to end timing, accepts the start time and message void end_timing(const std::chrono::steady_clock::time_point& start_time, const std::string& msg) { if (verbose) { auto end_time = std::chrono::steady_clock::now(); long long duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count(); std::cout << msg << " took " << duration << " µs" << std::endl; } } // Function to perform writing operations in a separate thread void write_thread_function() { int16_t write_value = 0; std::vector<int16_t> buffer(write_size); // Generate data once if the value doesn't change auto time_start1 = start_timing(); std::fill_n(buffer.data(), write_size, write_value); end_timing(time_start1, "Generate Data Operation"); while (write_thread_active) { auto time_start2 = start_timing(); deviceWrite->writeToDevice(0, buffer.data(), write_size * sizeof(int16_t)); end_timing(time_start2, "PCIe Write Operation"); if (verbose) { deviceWrite->printTransferSpeed(); } new_data_available = true; // Indicate that new data is available write_value += 1; std::this_thread::sleep_for(write_sleep_interval); } } // Function declarations INT frontend_init(void); INT frontend_exit(void); INT begin_of_run(INT run_number, char *error); INT end_of_run(INT run_number, char *error); INT pause_run(INT run_number, char *error); INT resume_run(INT run_number, char *error); INT frontend_loop(void); INT read_trigger_event(char *pevent, INT off); INT read_periodic_event(char *pevent, INT off); INT poll_event(INT source, INT count, BOOL test); INT interrupt_configure(INT cmd, INT source, POINTER_T adr); // Equipment list BOOL equipment_common_overwrite = TRUE; EQUIPMENT equipment[] = { {"Data Simulator %02d", {2, 0, "BUF%02d", EQ_POLLED, 0, "MIDAS", TRUE, RO_RUNNING, // Removed RO_ODB flag 1, // poll time in milliseconds 0, 0, TRUE, "", "", "",}, read_trigger_event }, {""} }; // Trigger Update void trigger_update(INT hDB, INT hkey, void*) { } // Frontend Init int frontend_init() { //Get settings path for this frontend frontend_index = get_frontend_index(); const char* unformatted_settings_path = "/Equipment/Data Simulator %02d/Settings"; snprintf(settings_path, sizeof(settings_path), unformatted_settings_path, frontend_index); // Define ODB settings with default values midas::odb o = { {"Polling Interval (us)", 1000}, {"Write Size", 1024}, {"Read Size", 1024}, {"Write Sleep Interval (us)", 10}, {"Read Only", FALSE}, {"Verbose", FALSE}, {"Device Read Path", "initial_value"}, {"Device Write Path", "initial_value"} }; // Connect to the ODB path o.connect(settings_path); // Retrieve device paths from ODB settings std::string read_path = static_cast<std::string>(o["Device Read Path"]); std::string write_path = static_cast<std::string>(o["Device Write Path"]); // Check and update Device Read Path if it has the initial placeholder value if (read_path == "initial_value") { std::string new_read_path = "/dev/xdma0_c2h_0"; o["Device Read Path"] = new_read_path; read_path = new_read_path; // Update local variable to reflect the new path } // Check and update Device Write Path if it has the initial placeholder value if (write_path == "initial_value") { std::string new_write_path = "/dev/xdma0_h2c_0"; o["Device Write Path"] = new_write_path; write_path = new_write_path; // Update local variable to reflect the new path } // Initialize devices with paths from ODB deviceRead = new XDMADeviceRead(read_path.c_str()); deviceWrite = new XDMADeviceWrite(write_path.c_str()); // Initialize the devices deviceRead->initialize(); deviceWrite->initialize(); return SUCCESS; } // Frontend Exit INT frontend_exit() { if (write_thread_active) { write_thread_active = false; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // Clean up dynamically allocated devices if (deviceRead) { delete deviceRead; deviceRead = nullptr; } if (deviceWrite) { delete deviceWrite; deviceWrite = nullptr; } return SUCCESS; } // Begin of Run INT begin_of_run(INT run_number, char *error) { { std::lock_guard<std::mutex> lock(settings_mutex); //Supposedly you can use `midas::odb settings(settings_path);` but that segfaults for some reason. midas::odb settings = { }; settings.connect(settings_path); polling_interval = std::chrono::microseconds(static_cast<int>(settings["Polling Interval (us)"])); write_size = static_cast<size_t>(static_cast<int>(settings["Write Size"])); read_size = static_cast<size_t>(static_cast<int>(settings["Read Size"])); write_sleep_interval = std::chrono::microseconds(static_cast<int>(settings["Write Sleep Interval (us)"])); read_only = static_cast<bool>(settings["Read Only"]); verbose = static_cast<bool>(settings["Verbose"]); } if (!read_only) { write_thread_active = true; std::thread write_thread(write_thread_function); write_thread.detach(); } else { write_thread_active = false; } return SUCCESS; } // End of Run INT end_of_run(INT run_number, char *error) { if (write_thread_active) { write_thread_active = false; std::this_thread::sleep_for(std::chrono::milliseconds(100)); } return SUCCESS; } // Pause Run INT pause_run(INT run_number, char *error) { return SUCCESS; } // Resume Run INT resume_run(INT run_number, char *error) { return SUCCESS; } // Frontend Loop INT frontend_loop() { return SUCCESS; } // Poll Event INT poll_event(INT source, INT count, BOOL test) { auto now = std::chrono::steady_clock::now(); if (now - last_poll_time >= polling_interval) { last_poll_time = now; if (read_only) { // In read-only mode, assume data is always available if (test) { return TRUE; } return TRUE; } else { // In write mode, check if new data is available if (new_data_available) { if (test) { return TRUE; } new_data_available = false; // Reset the flag after acknowledging return TRUE; } } } if (test) { return FALSE; } return FALSE; } // Interrupt Configuration INT interrupt_configure(INT cmd, INT source, POINTER_T adr) { switch (cmd) { case CMD_INTERRUPT_ENABLE: break; case CMD_INTERRUPT_DISABLE: break; case CMD_INTERRUPT_ATTACH: break; case CMD_INTERRUPT_DETACH: break; } return SUCCESS; } // Event Readout INT read_trigger_event(char *pevent, INT off) { bk_init32(pevent); short *pdata; bk_create(pevent, "CR%02d", TID_SHORT, (void **)&pdata); auto time_start1 = start_timing(); std::vector<std::byte> read_buffer = deviceRead->readFromDevice(0, read_size); end_timing(time_start1, "PCIe Read Operation"); if (verbose) { deviceRead->printTransferSpeed(); } auto time_start2 = start_timing(); // Ensure that the size of pdata is sufficient size_t num_bytes = read_buffer.size(); size_t num_shorts = num_bytes / sizeof(short); // Perform a bulk copy memcpy(pdata, read_buffer.data(), num_bytes); // Advance pdata pointer pdata += num_shorts; end_timing(time_start2, "Memory Copy Operation"); bk_close(pevent, pdata); return bk_size(pevent); } // Periodic Event INT read_periodic_event(char *pevent, INT off) { bk_init32(pevent); short *pdata; bk_create(pevent, "CR%02d", TID_SHORT, (void **)&pdata); size_t size = 1024; std::vector<std::byte> buffer = deviceRead->readFromDevice(0, size); size_t num_bytes = buffer.size(); size_t num_shorts = num_bytes / sizeof(short); memcpy(pdata, buffer.data(), num_bytes); pdata += num_shorts; bk_close(pevent, pdata); return bk_size(pevent); }
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <string.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <vector>
#include <chrono>
#include <thread>
#include <atomic>
#include <mutex>
#include "midas.h"
#include "odbxx.h"
#include "mfe.h"
#include "xdma_device_read.h"
#include "xdma_device_write.h"
// Define your PCIe devices as pointers to allow dynamic initialization
XDMADeviceRead* deviceRead = nullptr;
XDMADeviceWrite* deviceWrite = nullptr;
// Timing flag
#define ENABLE_TIMING 1
// Globals
const char *frontend_name = "DataSimulator";
const char *frontend_file_name = __FILE__;
BOOL frontend_call_loop = FALSE;
INT display_period = 0;
INT max_event_size = 128 * 1024 * 1024;
INT max_event_size_frag = 5 * max_event_size;
INT event_buffer_size = 5 * max_event_size;
INT frontend_index; // frontend index from command line argument -i
char settings_path[100];
// Define a vector to store 16-bit words
std::vector<int16_t> data;
size_t write_size = 1;
size_t read_size = 1;
// Global variable to keep track of the last poll time
std::chrono::steady_clock::time_point last_poll_time;
std::chrono::microseconds polling_interval(1000 * 1000);
std::chrono::microseconds write_sleep_interval(1000 * 1000);
// Global atomic flags and mutex
std::atomic<bool> write_thread_active(false); // Initialized to false
std::atomic<bool> new_data_available(false);
std::mutex settings_mutex;
bool read_only = false; // Global variable for Read Only flag
// Verbosity flag
bool verbose = false;
// Function to start timing, returns the start time
std::chrono::steady_clock::time_point start_timing() {
return std::chrono::steady_clock::now();
}
// Function to end timing, accepts the start time and message
void end_timing(const std::chrono::steady_clock::time_point& start_time, const std::string& msg) {
if (verbose) {
auto end_time = std::chrono::steady_clock::now();
long long duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count();
std::cout << msg << " took " << duration << " µs" << std::endl;
}
}
// Function to perform writing operations in a separate thread
void write_thread_function() {
int16_t write_value = 0;
std::vector<int16_t> buffer(write_size);
// Generate data once if the value doesn't change
auto time_start1 = start_timing();
std::fill_n(buffer.data(), write_size, write_value);
end_timing(time_start1, "Generate Data Operation");
while (write_thread_active) {
auto time_start2 = start_timing();
deviceWrite->writeToDevice(0, buffer.data(), write_size * sizeof(int16_t));
end_timing(time_start2, "PCIe Write Operation");
if (verbose) {
deviceWrite->printTransferSpeed();
}
new_data_available = true; // Indicate that new data is available
write_value += 1;
std::this_thread::sleep_for(write_sleep_interval);
}
}
// Function declarations
INT frontend_init(void);
INT frontend_exit(void);
INT begin_of_run(INT run_number, char *error);
INT end_of_run(INT run_number, char *error);
INT pause_run(INT run_number, char *error);
INT resume_run(INT run_number, char *error);
INT frontend_loop(void);
INT read_trigger_event(char *pevent, INT off);
INT read_periodic_event(char *pevent, INT off);
INT poll_event(INT source, INT count, BOOL test);
INT interrupt_configure(INT cmd, INT source, POINTER_T adr);
// Equipment list
BOOL equipment_common_overwrite = TRUE;
EQUIPMENT equipment[] = {
{"Data Simulator %02d",
{2, 0,
"BUF%02d",
EQ_POLLED,
0,
"MIDAS",
TRUE,
RO_RUNNING, // Removed RO_ODB flag
1, // poll time in milliseconds
0,
0,
TRUE,
"", "", "",},
read_trigger_event
},
{""}
};
// Trigger Update
void trigger_update(INT hDB, INT hkey, void*) {
}
// Frontend Init
int frontend_init() {
//Get settings path for this frontend
frontend_index = get_frontend_index();
const char* unformatted_settings_path = "/Equipment/Data Simulator %02d/Settings";
snprintf(settings_path, sizeof(settings_path), unformatted_settings_path, frontend_index);
// Define ODB settings with default values
midas::odb o = {
{"Polling Interval (us)", 1000},
{"Write Size", 1024},
{"Read Size", 1024},
{"Write Sleep Interval (us)", 10},
{"Read Only", FALSE},
{"Verbose", FALSE},
{"Device Read Path", "initial_value"},
{"Device Write Path", "initial_value"}
};
// Connect to the ODB path
o.connect(settings_path);
// Retrieve device paths from ODB settings
std::string read_path = static_cast<std::string>(o["Device Read Path"]);
std::string write_path = static_cast<std::string>(o["Device Write Path"]);
// Check and update Device Read Path if it has the initial placeholder value
if (read_path == "initial_value") {
std::string new_read_path = "/dev/xdma0_c2h_0";
o["Device Read Path"] = new_read_path;
read_path = new_read_path; // Update local variable to reflect the new path
}
// Check and update Device Write Path if it has the initial placeholder value
if (write_path == "initial_value") {
std::string new_write_path = "/dev/xdma0_h2c_0";
o["Device Write Path"] = new_write_path;
write_path = new_write_path; // Update local variable to reflect the new path
}
// Initialize devices with paths from ODB
deviceRead = new XDMADeviceRead(read_path.c_str());
deviceWrite = new XDMADeviceWrite(write_path.c_str());
// Initialize the devices
deviceRead->initialize();
deviceWrite->initialize();
return SUCCESS;
}
// Frontend Exit
INT frontend_exit() {
if (write_thread_active) {
write_thread_active = false;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// Clean up dynamically allocated devices
if (deviceRead) {
delete deviceRead;
deviceRead = nullptr;
}
if (deviceWrite) {
delete deviceWrite;
deviceWrite = nullptr;
}
return SUCCESS;
}
// Begin of Run
INT begin_of_run(INT run_number, char *error) {
{
std::lock_guard<std::mutex> lock(settings_mutex);
//Supposedly you can use `midas::odb settings(settings_path);` but that segfaults for some reason.
midas::odb settings = { };
settings.connect(settings_path);
polling_interval = std::chrono::microseconds(static_cast<int>(settings["Polling Interval (us)"]));
write_size = static_cast<size_t>(static_cast<int>(settings["Write Size"]));
read_size = static_cast<size_t>(static_cast<int>(settings["Read Size"]));
write_sleep_interval = std::chrono::microseconds(static_cast<int>(settings["Write Sleep Interval (us)"]));
read_only = static_cast<bool>(settings["Read Only"]);
verbose = static_cast<bool>(settings["Verbose"]);
}
if (!read_only) {
write_thread_active = true;
std::thread write_thread(write_thread_function);
write_thread.detach();
} else {
write_thread_active = false;
}
return SUCCESS;
}
// End of Run
INT end_of_run(INT run_number, char *error) {
if (write_thread_active) {
write_thread_active = false;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return SUCCESS;
}
// Pause Run
INT pause_run(INT run_number, char *error) {
return SUCCESS;
}
// Resume Run
INT resume_run(INT run_number, char *error) {
return SUCCESS;
}
// Frontend Loop
INT frontend_loop() {
return SUCCESS;
}
// Poll Event
INT poll_event(INT source, INT count, BOOL test) {
auto now = std::chrono::steady_clock::now();
if (now - last_poll_time >= polling_interval) {
last_poll_time = now;
if (read_only) {
// In read-only mode, assume data is always available
if (test) {
return TRUE;
}
return TRUE;
} else {
// In write mode, check if new data is available
if (new_data_available) {
if (test) {
return TRUE;
}
new_data_available = false; // Reset the flag after acknowledging
return TRUE;
}
}
}
if (test) {
return FALSE;
}
return FALSE;
}
// Interrupt Configuration
INT interrupt_configure(INT cmd, INT source, POINTER_T adr) {
switch (cmd) {
case CMD_INTERRUPT_ENABLE:
break;
case CMD_INTERRUPT_DISABLE:
break;
case CMD_INTERRUPT_ATTACH:
break;
case CMD_INTERRUPT_DETACH:
break;
}
return SUCCESS;
}
// Event Readout
INT read_trigger_event(char *pevent, INT off) {
bk_init32(pevent);
short *pdata;
bk_create(pevent, "CR%02d", TID_SHORT, (void **)&pdata);
auto time_start1 = start_timing();
std::vector<std::byte> read_buffer = deviceRead->readFromDevice(0, read_size);
end_timing(time_start1, "PCIe Read Operation");
if (verbose) {
deviceRead->printTransferSpeed();
}
auto time_start2 = start_timing();
// Ensure that the size of pdata is sufficient
size_t num_bytes = read_buffer.size();
size_t num_shorts = num_bytes / sizeof(short);
// Perform a bulk copy
memcpy(pdata, read_buffer.data(), num_bytes);
// Advance pdata pointer
pdata += num_shorts;
end_timing(time_start2, "Memory Copy Operation");
bk_close(pevent, pdata);
return bk_size(pevent);
}
// Periodic Event
INT read_periodic_event(char *pevent, INT off) {
bk_init32(pevent);
short *pdata;
bk_create(pevent, "CR%02d", TID_SHORT, (void **)&pdata);
size_t size = 1024;
std::vector<std::byte> buffer = deviceRead->readFromDevice(0, size);
size_t num_bytes = buffer.size();
size_t num_shorts = num_bytes / sizeof(short);
memcpy(pdata, buffer.data(), num_bytes);
pdata += num_shorts;
bk_close(pevent, pdata);
return bk_size(pevent);
}